-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9929: Support backward iterator on WindowStore #9138
Conversation
756ad2f
to
6dc89ff
Compare
4b8d0f7
to
e0589da
Compare
@ableegoldman, this PR is ready for review 👍 |
9cfecc0
to
07d2f6c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part 2 is looking good!
One general question: should we mention that the key ordering will also be reversed? I'm pretty sure that it will be automatically (because of the byte layout). But key ordering isn't really "guaranteed" in the first place, so maybe it's best to not make any claims at all.
*/ | ||
@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed | ||
// note, this method must be kept if super#fetch(...) is removed | ||
@SuppressWarnings("deprecation") | ||
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know why we have all these ReadOnlyWindowStore methods also declared here in WindowStore? We don't need reverse variations of these I guess? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods were introduced when adding Duration/Instant support #5682.
I don't think these are needed, we can do a similar change as for SessionStore read operations. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the note, this method must be kept if super#fetch(...) is removed
comments are making me nervous, but they could be out of date. Anyways I don't think you need to clean all this up right now, just wondering what's going on here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, given @mjsax 's response on KAFKA-10434 it seems like we actually need to add the reverse variation of these long-based methods to the WindowStore API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I've added the long-based methods to WindowStore to align this.
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
Show resolved
Hide resolved
@@ -426,7 +558,12 @@ private void getNextSegmentIterator() { | |||
setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github won't let me comment up there but on line 418, shouldn't we should have to decrement the currentSegmentId for the reverse case? I'm a little confused because it looks like you have test coverage for the multi-segment case and it seems to pass. Maybe I'm just tired and missing something obvious here..
For example in CachingWindowStoreTest#shouldFetchAndIterateOverKeyBackwardRange
the results seem to go across multiple segments, but it looks like we actually do return the record from the largest segment first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will have to double check this. I have inverted the current/last segment for backwards use-case though.
public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final Instant from, final Instant to) { | ||
final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); | ||
final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like it's a little awkward to have the reverse variations accept time parameters as an Instant
while the forward versions just use a long
. I would have thought we could migrate the long methods to Instant at some point but I see all these note, this method must be kept if super#fetch(...) is removed
comments littered throughout the code...so maybe there's a reason for sticking with the long
overrides in the innermost store layer?
Did you come across anything that suggested a reason for keeping the long flavors? cc @guozhangwang or @mjsax -- why can't we remove these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only backward compatibility. If it make sense to remove these deprecations as part of this KIP, I'd be happy to help cleaning it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, feel free to just create a followup ticket to see if we can clean things up. No need to block this PR on it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.apache.org/jira/browse/KAFKA-10434 created to follow this up.
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
Outdated
Show resolved
Hide resolved
...e/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
Outdated
Show resolved
Hide resolved
this.keyFrom = keyFrom; | ||
this.keyTo = keyTo; | ||
this.timeTo = timeTo; | ||
this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); | ||
this.forward = forward; | ||
|
||
this.segmentInterval = cacheFunction.getSegmentInterval(); | ||
this.currentSegmentId = cacheFunction.segmentId(timeFrom); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should start on the largest segment I think (largest segment == farthest advanced in time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great catch! I think this hasn't pop up yet in the tests as all tests may be using the same segment.
Will double check to add more tests to validate this.
07d2f6c
to
0871d19
Compare
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
Outdated
Show resolved
Hide resolved
new HashSet<>(asList(one, two, four, five)), | ||
toSet(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5))) | ||
assertArrayEquals( | ||
new LinkedHashSet<>(asList(one, two, four, five)).toArray(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels kind of ridiculous to convert this from a list to a set to an array all in one line. Maybe we can use assertThat(result, equalTo(expectedResult))
here like we've started to do elsewhere in Streams?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow this is embarrasing haha, looks like I was trying to get tests green without much thinking 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha actually I think I was the one who added the initial conversion from list to set to begin with, so I was clearly asking for trouble
throw new InvalidStateStoreException("Store is not open"); | ||
} | ||
final List<KeyValue<Windowed<K>, V>> results = new ArrayList<>(); | ||
for (final long now : data.keySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here (and all the backwards ReadOnlyWindowStoreStub methods): I think we are kind of forced to invert the key ordering for the backwards fetch methods as well, even if we don't necessarily want to. Probably users shouldn't be relying on a strict ordering of the keys anyway but we do have to match the ordering of the cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! One minor fix remaining on the InMemoryWindowStore but besides that this LGTM. Will ping @guozhangwang for a 2nd review & merge
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
Show resolved
Hide resolved
@ableegoldman key ordering is added to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass over the PR non-testing part. I just have one question regarding the caching segmented range search (cc'ed @ableegoldman as well) that needs some clarification. Otherwise LGTM.
* @return The value or {@code null} if no value is found in the window | ||
* @throws InvalidStateStoreException if the store is not initialized | ||
* @throws NullPointerException If {@code null} is used for any key. | ||
* @throws NullPointerException If {@code null} is used for any key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is this intentional? Also I'd suggest we do not use capitalized If
to be consistent with the above line, ditto elsewhere below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, IDE reformatting. I've fixed the capitalized If
ApiUtils.validateMillisecondInstant(timeTo, prepareMillisCheckFailMsgPrefix(timeTo, "timeTo"))); | ||
} | ||
|
||
default KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom, final long timeTo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are going to remove deprecated overloads with primitive long in the future, I think we do not need to expose a default function here, but just provide a default impl of the function in 204 below as UnsupportedOperation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idk, the current defaults make sense to me. If a user has a custom store and wants to use the new backwardFetchAll
with both longs and Instants, all they'd have to do is override the long-based backwardFetchAll
method (they have to implement the long version no matter what, since this is what gets used internally to Streams). If we just throw UnsupportedOperationException directly from the default implementation of the Instant-based backwardFetchAll
, then they would have to override that as well in their custom store. So we should just let the Instant default to the long method so users only have to implement one method instead of two (plus they would have to do the Instant validation themselves, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang do you mean the changes proposed in KIP-667? In that KIP we are removing long-based methods only from ReadOnlyWindowStore
, but will keep it in WindowStore
—unless you're referring to a different proposal.
I do agree with @ableegoldman. Users will have to opt-in to reverse operations when using custom stores, and by default only add implementation for long-based to support both params options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I was thinking about ReadOnlyWindowStore exposed in IQ only. All good.
* @throws NullPointerException If {@code null} is used for any key. | ||
* @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} | ||
*/ | ||
KeyValueIterator<Windowed<K>, V> backwardFetch(K from, K to, Instant timeFrom, Instant timeTo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is out of the scope of this PR, but I'd like to point out that the current IQ does not actually obey the ordering when there are multiple local stores hosted on that instance. For example, if there are two stores from two tasks hosting keys {1, 3} and {2,4}, then a range query of key [1,4] would return in the order of 1,3,2,4
but not 1,2,3,4
since it is looping over the stores only. This would be the case for either forward or backward fetches on range-key-range-time.
For single key time range fetch, or course, there's no such issue.
I think it worth documenting this for now until we have a fix (and actually we are going to propose something soon).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, @jeqo noticed that back in the KeyValueStore PR. That's a good point about clarifying this point in the javadocs, not sure if it makes sense to do so on the ReadOnlyWindowStore methods directly vs on some IQ-related method/class? Maybe we can just do a quick followup PR to shore up the documentation here without blocking this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#9137 (comment) old discussion for context.
@guozhangwang is this scenario only possible with composite stores? or are there other implementations where this is an alternative path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The composite stores are the ones used in IQ. So IQ queries would get impacted by this, for processing, since it is always within a single task there should be no problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @guozhangwang ! I created https://issues.apache.org/jira/browse/KAFKA-10459 to follow up.
@@ -192,4 +192,3 @@ public void close() { | |||
storeIterator.close(); | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional? We usually have a newline at file end in case some specific IDEs do not like otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks strange. I can see there is a newline in my IDE. Seems that in trunk it had 2 newlines at the end. I can roll-back this change if needed.
segmentId(timeTo), true | ||
); | ||
final NavigableMap<Long, S> segmentsInRange; | ||
if (forward) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can just call subMap
out of the condition and only call descendingMap()
based on the condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer this immutable approach as it's the same used in keyValueStore changes, if that's ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG, nit
comments are just suggestions not mandatory.
} | ||
|
||
@Override | ||
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comments: I think we do not need to add overloads for primitive types for the newly added APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would still need to keep this method: we're not removing all long-based APIs, just the public/IQ methods in ReadOnlyWindowStore. But we still want to keep the long-based methods on WindowStore and all the internal store interfaces for performance reasons.
Maybe once we move everything to use Instant
all the way down to the serialization then we can remove these long-based methods. I guess we should consider that when discussing KIP-667, but for the time being at least, we should keep them for internal use
this.currentSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); | ||
this.lastSegmentId = cacheFunction.segmentId(timeFrom); | ||
|
||
setCacheKeyRange(currentSegmentBeginTime(), Math.min(timeTo, maxObservedTimestamp.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this call is different based on the forward
boolean? It's not clear to me. cc @ableegoldman @lct45 could you double check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks right to me -- in the iterator constructor, we would normally start from timeFrom
(the minimum time) and advance to the end of the current segment (that's what the "cache key range" defines, the range of the current segment) When iterating backwards, the current segment is actually the largest segment, so the cache key lower range is the current (largest) segment's beginning timestamp, and the upper range is the maximum timestamp of the backwards fetch. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
Show resolved
Hide resolved
4decbb7
to
814ef3b
Compare
…rder (#11292) When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it. Currently, in Window store, we store records in [segments -> [records] ]. For example: window size = 500, input records: key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window So, internally, the "a" and "b" will be in the same segment, and "c" in another segments. segments: [0 /* window start */, records], [500, records]. And the records for window start 0 will be "a" and "b". the records for window start 500 will be "c". Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously. Reviewers: Jorge Esteban Quilcate Otoya <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
…rder (apache#11292) When introducing backward iterator for WindowStroe in apache#9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it. Currently, in Window store, we store records in [segments -> [records] ]. For example: window size = 500, input records: key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window So, internally, the "a" and "b" will be in the same segment, and "c" in another segments. segments: [0 /* window start */, records], [500, records]. And the records for window start 0 will be "a" and "b". the records for window start 500 will be "c". Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously. Reviewers: Jorge Esteban Quilcate Otoya <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
Depends on #9137
Implements KIP-617 on
WindowStore
.Testing strategy: extend existing tests to validate reverse operations are supported.
Committer Checklist (excluded from commit message)